{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "5085c7e3-2f73-4539-9037-bed5ce6cf673",
   "metadata": {},
   "source": [
    "# Magika: AI powered fast and efficient file type identification using OpenVINO\n",
    "\n",
    "Magika is a novel AI powered file type detection tool that relies on the recent advance of deep learning to provide accurate detection. Under the hood, Magika employs a custom, highly optimized model that only weighs about 1MB, and enables precise file identification within milliseconds, even when running on a single CPU.\n",
    "\n",
    "\n",
    "## Why identifying file type is difficult\n",
    "\n",
    "Since the early days of computing, accurately detecting file types has been crucial in determining how to process files. Linux comes equipped with `libmagic` and the `file` utility, which have served as the de facto standard for file type identification for over 50 years. Today web browsers, code editors, and countless other software rely on file-type detection to decide how to properly render a file. For example, modern code editors use file-type detection to choose which syntax coloring scheme to use as the developer starts typing in a new file.\n",
    "\n",
    "Accurate file-type detection is a notoriously difficult problem because each file format has a different structure, or no structure at all. This is particularly challenging for textual formats and programming languages as they have very similar constructs. So far, `libmagic` and most other file-type-identification software have been relying on a handcrafted collection of heuristics and custom rules to detect each file format.\n",
    "\n",
    "This manual approach is both time consuming and error prone as it is hard for humans to create generalized rules by hand. In particular for security applications, creating dependable detection is especially challenging as attackers are constantly attempting to confuse detection with adversarially-crafted payloads.\n",
    "\n",
    "To address this issue and provide fast and accurate file-type detection Magika was developed. More details about approach and model can be found in original [repo](https://github.com/google/magika) and [Google's blog post](https://opensource.googleblog.com/2024/02/magika-ai-powered-fast-and-efficient-file-type-identification.html).\n",
    "\n",
    "In this tutorial we consider how to bring OpenVINO power into Magika.\n",
    "#### Table of contents:\n",
    "\n",
    "- [Prerequisites](#Prerequisites)\n",
    "- [Define model loading class](#Define-model-loading-class)\n",
    "- [Run OpenVINO model inference](#Run-OpenVINO-model-inference)\n",
    "    - [Select device](#Select-device)\n",
    "    - [Create model](#Create-model)\n",
    "    - [Run inference on bytes input](#Run-inference-on-bytes-input)\n",
    "    - [Run inference on file input](#Run-inference-on-file-input)\n",
    "- [Interactive demo](#Interactive-demo)\n",
    "\n",
    "\n",
    "### Installation Instructions\n",
    "\n",
    "This is a self-contained example that relies solely on its own code.\n",
    "\n",
    "We recommend  running the notebook in a virtual environment. You only need a Jupyter server to start.\n",
    "For details, please refer to [Installation Guide](https://github.com/openvinotoolkit/openvino_notebooks/blob/latest/README.md#-installation-guide).\n",
    "\n",
    "<img referrerpolicy=\"no-referrer-when-downgrade\" src=\"https://static.scarf.sh/a.png?x-pxid=5b5a4db0-7875-4bfb-bdbd-01698b5b1a77&file=notebooks/magika-content-type-recognition/magika-content-type-recognition.ipynb\" />\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "eae002d8-bdb1-4720-8613-1f1933c44524",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "[back to top ⬆️](#Table-of-contents:)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "91f4a127-0133-4daa-9021-f62dec73625b",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m25.0\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.0.1\u001b[0m\n",
      "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
      "Note: you may need to restart the kernel to use updated packages.\n"
     ]
    }
   ],
   "source": [
    "%pip install -qU onnxruntime \"magika>=0.6.2\" \"openvino>=2025.2.0\" \"gradio>=4.19\""
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b384b9a5-2aed-4cc3-9977-23f62263850d",
   "metadata": {},
   "source": [
    "## Define model loading class\n",
    "[back to top ⬆️](#Table-of-contents:)\n",
    "\n",
    "At inference time Magika uses ONNX as an inference engine to ensure files are identified in a matter of milliseconds, almost as fast as a non-AI tool even on CPU. The code below extending original Magika inference class with OpenVINO API.\n",
    "The provided code is fully compatible with original [Magika Python API](https://github.com/google/magika/blob/main/docs/python.md). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cf66397d-b6b1-4c39-844c-6dc665c53512",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import time\n",
    "from pathlib import Path\n",
    "from functools import partial\n",
    "from typing import Optional\n",
    "\n",
    "from magika import Magika\n",
    "from magika.types import ModelFeatures, ModelOutput, MagikaResult\n",
    "\n",
    "try:\n",
    "    from magika.types.prediction_mode import PredictionMode\n",
    "except ImportError:\n",
    "    from magika.prediction_mode import PredictionMode\n",
    "import numpy.typing as npt\n",
    "import numpy as np\n",
    "\n",
    "import openvino as ov\n",
    "\n",
    "import requests\n",
    "\n",
    "if not Path(\"notebook_utils.py\").exists():\n",
    "    r = requests.get(\n",
    "        url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py\",\n",
    "    )\n",
    "    open(\"notebook_utils.py\", \"w\").write(r.text)\n",
    "\n",
    "\n",
    "# Read more about telemetry collection at https://github.com/openvinotoolkit/openvino_notebooks?tab=readme-ov-file#-telemetry\n",
    "from notebook_utils import collect_telemetry\n",
    "\n",
    "collect_telemetry(\"magika-content-type-recognition.ipynb\")\n",
    "\n",
    "\n",
    "class OVMagika(Magika):\n",
    "    def __init__(\n",
    "        self,\n",
    "        model_dir: Optional[Path] = None,\n",
    "        prediction_mode: PredictionMode = PredictionMode.HIGH_CONFIDENCE,\n",
    "        no_dereference: bool = False,\n",
    "        verbose: bool = False,\n",
    "        debug: bool = False,\n",
    "        use_colors: bool = False,\n",
    "        device=\"CPU\",\n",
    "    ) -> None:\n",
    "        self._device = device\n",
    "        super().__init__(model_dir, prediction_mode, no_dereference, verbose, debug, use_colors)\n",
    "\n",
    "    def _init_onnx_session(self):\n",
    "        # overload model loading using OpenVINO\n",
    "        start_time = time.time()\n",
    "        core = ov.Core()\n",
    "        ov_model = core.compile_model(self._model_path, self._device.upper())\n",
    "        elapsed_time = 1000 * (time.time() - start_time)\n",
    "        self._log.debug(f'ONNX DL model \"{self._model_path}\" loaded in {elapsed_time:.03f} ms on {self._device}')\n",
    "        return ov_model\n",
    "\n",
    "    def _get_raw_predictions(self, features: list[tuple[Path, ModelFeatures]]) -> npt.NDArray:\n",
    "        \"\"\"\n",
    "        Given a list of (path, features), return a (files_num, features_size)\n",
    "        matrix encoding the predictions.\n",
    "        \"\"\"\n",
    "        start_time = time.time()\n",
    "        X_bytes = []\n",
    "        for _, fs in features:\n",
    "            sample_bytes = []\n",
    "            if self._model_config.beg_size > 0:\n",
    "                sample_bytes.extend(fs.beg[: self._model_config.beg_size])\n",
    "            if self._model_config.mid_size > 0:\n",
    "                sample_bytes.extend(fs.mid[: self._model_config.mid_size])\n",
    "            if self._model_config.end_size > 0:\n",
    "                sample_bytes.extend(fs.end[-self._model_config.end_size :])\n",
    "            X_bytes.append(sample_bytes)\n",
    "        X = np.array(X_bytes, dtype=np.int32)\n",
    "        elapsed_time = 1000 * (time.time() - start_time)\n",
    "        self._log.debug(f\"DL input prepared in {elapsed_time:.03f} ms\")\n",
    "\n",
    "        raw_predictions_list = []\n",
    "        samples_num = X.shape[0]\n",
    "\n",
    "        max_internal_batch_size = 1000\n",
    "        batches_num = samples_num // max_internal_batch_size\n",
    "        if samples_num % max_internal_batch_size != 0:\n",
    "            batches_num += 1\n",
    "\n",
    "        for batch_idx in range(batches_num):\n",
    "            self._log.debug(f\"Getting raw predictions for (internal) batch {batch_idx+1}/{batches_num}\")\n",
    "            start_idx = batch_idx * max_internal_batch_size\n",
    "            end_idx = min((batch_idx + 1) * max_internal_batch_size, samples_num)\n",
    "            batch_raw_predictions = self._onnx_session({\"bytes\": X[start_idx:end_idx, :]})[\"target_label\"]\n",
    "            raw_predictions_list.append(batch_raw_predictions)\n",
    "        elapsed_time = time.time() - start_time\n",
    "        self._log.debug(f\"DL raw prediction in {elapsed_time:.03f} seconds\")\n",
    "        return np.concatenate(raw_predictions_list)\n",
    "\n",
    "    def _get_topk_model_outputs_from_features(self, all_features: list[tuple[Path, ModelFeatures]], k: int = 5) -> list[tuple[Path, list[ModelOutput]]]:\n",
    "        \"\"\"\n",
    "        Helper function for getting top k the highest ranked model results for each feature\n",
    "        \"\"\"\n",
    "        raw_preds = self._get_raw_predictions(all_features)\n",
    "        top_preds_idxs = np.argsort(raw_preds, axis=1)[:, -k:][:, ::-1]\n",
    "        scores = [raw_preds[i, idx] for i, idx in enumerate(top_preds_idxs)]\n",
    "        results = []\n",
    "        for (path, _), scores, top_idxes in zip(all_features, raw_preds, top_preds_idxs):\n",
    "            model_outputs_for_path = []\n",
    "            for idx in top_idxes:\n",
    "                label = self._target_labels_space_np[idx]\n",
    "                score = scores[idx]\n",
    "                model_outputs_for_path.append(ModelOutput(label=label, score=float(score)))\n",
    "            results.append((path, model_outputs_for_path))\n",
    "        return results\n",
    "\n",
    "    def _get_results_from_features_topk(self, all_features: list[tuple[Path, ModelFeatures]], top_k=5) -> dict[str, MagikaResult]:\n",
    "        \"\"\"\n",
    "        Helper function for getting top k the highest ranked model results for each feature\n",
    "        \"\"\"\n",
    "        # We now do inference for those files that need it.\n",
    "\n",
    "        if len(all_features) == 0:\n",
    "            # nothing to be done\n",
    "            return {}\n",
    "\n",
    "        outputs: dict[str, MagikaResult] = {}\n",
    "\n",
    "        for path, model_output in self._get_topk_model_outputs_from_features(all_features, top_k):\n",
    "            # In additional to the content type label from the DL model, we\n",
    "            # also allow for other logic to overwrite such result. For\n",
    "            # debugging and information purposes, the JSON output stores\n",
    "            # both the raw DL model output and the final output we return to\n",
    "            # the user.\n",
    "            results = []\n",
    "            for out in model_output:\n",
    "                output_label, _ = self._get_output_label_from_dl_label_and_score(out.label, out.score)\n",
    "\n",
    "                results.append(\n",
    "                    self._get_result_from_labels_and_score(\n",
    "                        path,\n",
    "                        dl_label=output_label,\n",
    "                        output_label=output_label,\n",
    "                        score=out.score,\n",
    "                    )\n",
    "                )\n",
    "            outputs[str(path)] = results\n",
    "\n",
    "        return outputs\n",
    "\n",
    "    def identify_bytes_topk(self, content: bytes, top_k=5) -> MagikaResult:\n",
    "        # Helper function for getting topk results from bytes\n",
    "        _get_results_from_features = self._get_results_from_features\n",
    "        self._get_results_from_features = partial(self._get_results_from_features_topk, top_k=top_k)\n",
    "        result = super().identify_bytes(content)\n",
    "        self._get_results_from_features = _get_results_from_features\n",
    "        return result"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e70e7055-c051-40d9-8453-ccda80a111c8",
   "metadata": {},
   "source": [
    "## Run OpenVINO model inference\n",
    "[back to top ⬆️](#Table-of-contents:)\n",
    "\n",
    "Now let's check model inference result.\n",
    "\n",
    "### Select device\n",
    "[back to top ⬆️](#Table-of-contents:)\n",
    "\n",
    "For starting work, please, select one of represented devices from dropdown list."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "3bde7725-59c0-4de4-95b2-46cd7bd6ab81",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "0e3cc43da6ef4fba99d8a432124324bc",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from notebook_utils import device_widget\n",
    "\n",
    "device = device_widget()\n",
    "\n",
    "device"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "4a86889a-b2e6-4896-8e1d-e5a44c6172e3",
   "metadata": {},
   "source": [
    "### Create model\n",
    "[back to top ⬆️](#Table-of-contents:)\n",
    "\n",
    "As we discussed above, our OpenVINO extended `OVMagika` class has the same API like original one. Let's try to create interface instance and launch it on different input formats"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "3a03a8d4-c428-4609-985a-1fb5ca074d06",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "ov_magika = OVMagika(device=device.value)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "18be42dd-5ef9-4b9d-9062-71844533ae77",
   "metadata": {},
   "source": [
    "### Run inference on bytes input\n",
    "[back to top ⬆️](#Table-of-contents:)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "53f40c64-afad-4856-bb1a-fe38bef67317",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Content type: markdown - 82.43%\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/ea/work/py311/lib/python3.11/site-packages/magika/types/content_type_info.py:22: DeprecationWarning: `.ct_label` is deprecated and will be removed in a future version. Use `.label` instead. Consult the documentation for more information.\n",
      "  warnings.warn(\n"
     ]
    }
   ],
   "source": [
    "result = ov_magika.identify_bytes(b\"# Example\\nThis is an example of markdown!\")\n",
    "print(f\"Content type: {result.output.label} - {result.score * 100:.4}%\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7e350aeb-a033-449e-a4b7-cefa27ed7e0f",
   "metadata": {},
   "source": [
    "### Run inference on file input\n",
    "[back to top ⬆️](#Table-of-contents:)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5d5df88a-a9f3-45c5-b340-b93701f39c7f",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Content type: markdown - 99.97%\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/ea/work/py311/lib/python3.11/site-packages/magika/types/content_type_info.py:22: DeprecationWarning: `.ct_label` is deprecated and will be removed in a future version. Use `.label` instead. Consult the documentation for more information.\n",
      "  warnings.warn(\n"
     ]
    }
   ],
   "source": [
    "import requests\n",
    "\n",
    "input_file = Path(\"./README.md\")\n",
    "if not input_file.exists():\n",
    "    r = requests.get(\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/README.md\")\n",
    "    with open(\"README.md\", \"w\") as f:\n",
    "        f.write(r.text)\n",
    "result = ov_magika.identify_path(input_file)\n",
    "print(f\"Content type: {result.output.label} - {result.score * 100:.4}%\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "010280d4-e3fc-4f51-854d-f6cc9ab6a06c",
   "metadata": {},
   "source": [
    "## Interactive demo\n",
    "[back to top ⬆️](#Table-of-contents:)\n",
    "\n",
    "Now, you can try model on own files. Upload file into input file window, click submit button and look on predicted file types."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fa6c4cce-50b1-40d7-b010-0a8a114f38ce",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "if not Path(\"gradio_helper.py\").exists():\n",
    "    r = requests.get(\n",
    "        url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/notebooks/magika-content-type-recognition/gradio_helper.py\"\n",
    "    )\n",
    "    open(\"gradio_helper.py\", \"w\").write(r.text)\n",
    "\n",
    "from gradio_helper import make_demo\n",
    "\n",
    "demo = make_demo(ov_magika)\n",
    "\n",
    "try:\n",
    "    demo.launch(debug=True)\n",
    "except Exception:\n",
    "    demo.launch(share=True, debug=True)\n",
    "# if you are launching remotely, specify server_name and server_port\n",
    "# demo.launch(server_name='your server name', server_port='server port in int')\n",
    "# Read more in the docs: https://gradio.app/docs/"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.4"
  },
  "openvino_notebooks": {
   "imageUrl": "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/b99cb2c0-d9cb-47a7-ba17-1b4b2eed01da",
   "tags": {
    "categories": [
     "Model Demos",
     "AI Trends"
    ],
    "libraries": [],
    "other": [],
    "tasks": [
     "Bytes Classification"
    ]
   }
  },
  "widgets": {
   "application/vnd.jupyter.widget-state+json": {
    "state": {
     "0e3cc43da6ef4fba99d8a432124324bc": {
      "model_module": "@jupyter-widgets/controls",
      "model_module_version": "2.0.0",
      "model_name": "DropdownModel",
      "state": {
       "_options_labels": [
        "CPU",
        "AUTO"
       ],
       "description": "Device:",
       "index": 1,
       "layout": "IPY_MODEL_d515552eb19847b3a492898a29172af8",
       "style": "IPY_MODEL_bdae71df78e8483d9cb09cab767fde4c"
      }
     },
     "bdae71df78e8483d9cb09cab767fde4c": {
      "model_module": "@jupyter-widgets/controls",
      "model_module_version": "2.0.0",
      "model_name": "DescriptionStyleModel",
      "state": {
       "description_width": ""
      }
     },
     "d515552eb19847b3a492898a29172af8": {
      "model_module": "@jupyter-widgets/base",
      "model_module_version": "2.0.0",
      "model_name": "LayoutModel",
      "state": {}
     }
    },
    "version_major": 2,
    "version_minor": 0
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
